跳到主要内容

Java 操作 ElasticSearch

配置环境

通过官方文档可以得知,现在存在至少三种Java客户端。

  • Transport Client
  • Java High Level REST Client
  • Java Low Level Rest Client

造成这种混乱的原因是:

1、长久以来,ES 并没有官方的 Java 客户端,并且 Java 自身是可以简单支持 ES 的 API 的,于是就先做成了 TransportClient。但是 TransportClient 的缺点是显而易见的,它没有使用 RESTful 风格的接口,而是二进制的方式传输数据。

2、之后 ES 官方推出了 Java Low Level REST Client,它支持 RESTful,用起来也不错。但是缺点也很明显,因为TransportClient 的使用者把代码迁移到 Low Level REST Client 的工作量比较大。官方文档专门为迁移代码出了一堆文档来提供参考。

3、现在 ES 官方推出 Java High Level REST Client,它是基于 Java Low Level REST Client 的封装,并且 API 接收参数和返回值和 TransportClient 是一样的,使得代码迁移变得容易并且支持了 RESTful 的风格,兼容了这两种客户端的优点。当然缺点是存在的,就是版本的问题。ES 的小版本更新非常频繁,在最理想的情况下,客户端的版本要和 ES 的版本一致(至少主版本号一致),次版本号不一致的话,基本操作也许可以,但是新 API 就不支持了。

强烈建议 ES5 及其以后的版本使用Java High Level REST Client。其主要导入如下两个依赖,注意依赖要和 ES 版本一致

<!--es-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.11</version>
</dependency>
<!--es的高级api-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.11</version>
</dependency>

创建一个客户端用来连接服务

public class ESClient {
public static RestHighLevelClient getClient(){
// 指定es服务器的ip,端口
HttpHost httpHost = new HttpHost("localhost",9200);

// 创建 RestClientBuilder
RestClientBuilder builder = RestClient.builder(httpHost);
return new RestHighLevelClient(builder);
}
}

连接测试:

class ESClientTest {

@Test
public void testConnect() {
RestHighLevelClient client = ESClient.getClient(); // 如果没有连接成功会抛异常的
System.out.println("ok!");
}
}

操作索引

创建 Index

注意,因为后续版本不再使用 type了,所以这里也没有提供创建 type 的方式

这里有三种创建方式:

第一种使用 Builder 的方式

import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;

// ...

private RestHighLevelClient client = ESClient.getClient();
private String index = "person";
private String type = "man";

//创建索引
@Test
public void createIndex() throws IOException {
// 1、创建索引
Settings.Builder settings = Settings.builder()
.put("number_of_shards", 5)
.put("number_of_replicas", 1);

// 2、准备索引的结构 mappings 这里就是构建一个索引结构
XContentBuilder builder = XContentFactory.jsonBuilder()
// 这个 start end 是成对出现的
.startObject()
.startObject("properties")
.startObject("name")
.field("type", "text")
.endObject()
.startObject("age")
.field("type", "integer")
.endObject()
.startObject("birthday")
.field("type", "date")
// 这里格式前需要加上一个 "8" 否则可能会报错
.field("format", "8yyyy-MM-dd")
.endObject()
.endObject()
.endObject();

// 3、将 settings 和 mappings 封装为 Request 对象
CreateIndexRequest request = new CreateIndexRequest(index)
.settings(settings)
.mapping(builder);

// 4、通过 client 对象去连接 ES 并创建索引
CreateIndexResponse res = client.indices().create(request, RequestOptions.DEFAULT);

// 打印是否连接成功
System.out.println(res.toString());
}

创建出来的结果如下

{
"mapping": {
"_doc": {
"properties": {
"age": {
"type": "integer"
},
"birthday": {
"type": "date",
"format": "8yyyy-MM-dd"
},
"name": {
"type": "text"
}
}
}
}
}

可以看到,没有指定 index,默认的 type 是 "_doc"

第二种以 Map 创建的方式

//CreateIndexRequest 实例
CreateIndexRequest request = new CreateIndexRequest(indexName);

//封装属性 类似于json格式
Map<String, Object> jsonMap = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> account = new HashMap<>();

content.put("type", "integer");
content .put("type", "text");
content .put("analyzer", "ik_max_word");
properties.put("id", content);
properties.put("account", account);
jsonMap.put("properties", properties);

//设置分片
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);

request.mapping(jsonMap);

//使用的同步的方式 异步请参考官方文档
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
if (!createIndexResponse.isAcknowledged()){
throw new BusinessException("创建索引失败");
}

第三种以 json 的方式

request.mapping(
"{\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}",
XContentType.JSON);

CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);

if (!createIndexResponse.isAcknowledged()){
throw new BusinessException("创建索引失败");
}

注意,新版(7.x)不再支持一个 index 下多个 type 了,有多少个 type 就需要创建多少个新的 index

检查索引是否存在

private String index = "person";

// 检查索引是否存在
@Test
public void test2() throws IOException {
// 准备request对象
GetIndexRequest request = new GetIndexRequest(index);
// 通过client对象操作
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
// 输出,
System.out.println(exists);
}

删除索引

这里结果拿不拿到无所谓,因为删除失败直接就抛异常了

private String index = "person";

// 删除索引
@Test
public void deleteIndex() throws IOException {
// 准备request对象
DeleteIndexRequest request = new DeleteIndexRequest();
request.indices(index);

//通过client对象操作
AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);

// 拿的是否删除成功的结果,是个布尔类型的值
System.out.println(delete.isAcknowledged());
}

操作文档

添加 JSON 依赖

因为这里需要解析 JSON 所以需要导入一个 Java 的 JSON 解析库

<!--jackson-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.4</version>
</dependency>

准备实体类

准备一个实体类,因为,ES 的 id 是在路径上的,因此不需要存储 @JsonIgnore 注解忽略这个属性,然后将 Data 类型转为 ES 的这种类型 @JsonFormat(pattern = "yyyy-MM-dd") 注解

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person {
@JsonIgnore
private Integer id;

private String name;
private Integer age;

@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date birthday;
}

创建文档

这里要注意:因为上面创建 index 没有指定 type 所以它默认是 "_doc"

所以如下创建一个 doc

private RestHighLevelClient client = ESClient.getClient();
private String index = "person";

// 文档创建
@Test
void createDoc() throws IOException {
// jackson
ObjectMapper mapper = new ObjectMapper();

// 1 准备一个 json 数据
Person person = new Person(1, "张三", 20, new Date());
String json = mapper.writeValueAsString(person);
System.out.println(json);
// 2 request 对象,手动指定 id,使用 person 对象的 id
IndexRequest request = new IndexRequest(index);
// 这里可以直接像下面这样写
// IndexRequest request1 = new IndexRequest(index, "_doc", person.getId().toString());
// 因为上面创建 index 没有指定 type 所以它默认是 "_doc"
request.type("_doc");
request.id(person.getId().toString());
request.source(json, XContentType.JSON);// 第二个参数告诉他这个参数是json类型
// 3 通过 client 操作
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// 4 创建成功返回的结果
String result = response.getResult().toString();
System.out.println(result); // 成功会返回 CREATED
}

这时检查 kibana 可以发现刚插入的数据

修改文档

// 文档修改
@Test
void updateDoc() throws IOException {
// 1 创建一个map
Map<String, Object> doc = new HashMap<>();
doc.put("name", "李四");
String docId = "1";
// 2 创建一个 request 对象,指定要修改哪个,这里指定了 index,type 和 doc 的 Id,也就是确定唯一的 doc
UpdateRequest request = new UpdateRequest(index, "_doc", docId);
// 指定修改的内容
request.doc(doc);
// 3 client对象执行
UpdateResponse update = client.update(request, RequestOptions.DEFAULT);
// 4 执行返回的结果
String result = update.getResult().toString();
System.out.println(result); // 返回结果为 UPDATE
}

回到 kibana 可以发现数据更改了

删除文档

// 删除文档
@Test
public void deleteDoc() throws IOException {
// 指定要删除 1号文档
DeleteRequest request = new DeleteRequest(index,"_doc" , "1");
// 通过client执行
DeleteResponse delete = client.delete(request, RequestOptions.DEFAULT);
// 获取执行结果
String result = delete.getResult().toString();
System.out.println(result); // 返回结果为 DELETED
}

批量操作

批量添加

private RestHighLevelClient client = ESClient.getClient();
private String index = "person";
private String type = "_doc";

// 创建批量操作
@Test
public void bulkCreateDoc() throws IOException {
// 准备多个json数据
Person p1 = new Person(1,"张三",22,new Date());
Person p2 = new Person(2,"李四",22,new Date());
Person p3 = new Person(3,"王五",22,new Date());
// 转为json
ObjectMapper mapper = new ObjectMapper();
String json1 = mapper.writeValueAsString(p1);
String json2 = mapper.writeValueAsString(p2);
String json3 = mapper.writeValueAsString(p2);
// request,将数据封装进去
BulkRequest request = new BulkRequest();
request.add(new IndexRequest(index,type,p1.getId().toString()).source(json1,XContentType.JSON));
request.add(new IndexRequest(index,type,p2.getId().toString()).source(json2,XContentType.JSON));
request.add(new IndexRequest(index,type,p3.getId().toString()).source(json3,XContentType.JSON));
// client执行
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
}

批量删除

// 批量删除
@Test
public void bulkDeleteDoc() throws IOException {
BulkRequest request = new BulkRequest();
// 将要删除的doc的id添加到request
request.add(new DeleteRequest(index,type,"1"));
request.add(new DeleteRequest(index,type,"2"));
request.add(new DeleteRequest(index,type,"3"));
// client执行
client.bulk(request,RequestOptions.DEFAULT);
}

Reference

参考资料 千锋 Elasticsearch教学视频 参考资料 Elasticsearch Java API的基本使用